package com.mimo.comet.ws.provider;

import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.StringValue;
import com.mimo.comet.config.LocalCometServerConfig;
import com.mimo.comet.config.LogicMsgPoolFactory;
import com.mimo.comet.config.LogicRoomPoolFactory;
import com.mimo.comet.config.LogicSessionPoolFactory;
import com.mimo.comet.constants.CometType;
import com.mimo.comet.dao.ISessionDao;
import com.mimo.comet.provider.ICommandProvider;
import com.mimo.comet.provider.ISession;
import com.mimo.comet.provider.command.AckCommand;
import com.mimo.comet.provider.command.BaseCommand;
import com.mimo.comet.provider.command.JoinRoomCommand;
import com.mimo.comet.provider.command.LeaveRoomCommand;
import com.mimo.comet.provider.command.P2PMessageCommand;
import com.mimo.comet.provider.command.P2RoomMessageCommand;
import com.mimo.comet.provider.command.echo.BaseEchoCommand;
import com.mimo.comet.provider.command.echo.ErrorEchoCommand;
import com.mimo.comet.provider.command.echo.ListRoomsEchoCommand;
import com.mimo.comet.provider.command.echo.RetrieveUnAckEchoCommand;
import com.mimo.comet.user.service.IUserService;
import com.mimo.common.logic.code.StatusCode;
import com.mimo.common.logic.dto.msg.BaseDispatchMessage;
import com.mimo.common.logic.dto.msg.P2PMessage;
import com.mimo.common.logic.dto.msg.Peer2RoomMessage;
import com.mimo.common.rpc.proto.BaseResponseProto.BaseResponse;
import com.mimo.common.rpc.proto.LogicMsgRpc.AckMessage;
import com.mimo.common.rpc.proto.LogicRoomRpc.ClientRoomOpsMessage;
import com.mimo.common.rpc.proto.SessionProto.TouchReq;

@Service
public class WebSocketCommandProvider implements ICommandProvider<BaseCommand, BaseEchoCommand> {

  @Autowired
  private ISessionDao sessionDao;

  @Autowired
  private LogicSessionPoolFactory logicSessionPoolFactory;

  @Autowired
  private LogicRoomPoolFactory logicRoomPoolFactory;

  @Autowired
  private LogicMsgPoolFactory logicMsgPoolFactory;

  @Autowired
  private IUserService userService;

  @Autowired
  private LocalCometServerConfig localCometServerConfig;

  @Override
  public CometType getSupportCometType() {
    return CometType.WebSocket;
  }

  @Override
  public BaseEchoCommand process(String userId, BaseCommand cmd) {
    BaseEchoCommand echo = null;
    Optional<ISession> opt = sessionDao.findByUid(userId);
    if (opt.isPresent()) {
      opt.get().touch(); // 最近心跳维护
      StatusCode statusCode = null;
      switch (cmd.getType()) {
        case Logout:
          userService.logout(userId, cmd.getId());
          break;
        case HeartBeat:
          com.mimo.common.rpc.proto.UserProto.ZoneDTO zone = localCometServerConfig.getLocalProtoZone();
          logicSessionPoolFactory.getLogicSessionStub()
          .touch(TouchReq.newBuilder().setUid(userId).setZone(zone).build());
          break;
        case JoinRoom:
          JoinRoomCommand jrc = JoinRoomCommand.class.cast(cmd);
          statusCode = StatusCode.from(logicRoomPoolFactory.getLogicRoomStub()
              .join(ClientRoomOpsMessage.newBuilder().setRoomId(jrc.getTarget()).setUserId(userId).build()));
          break;
        case LeaveRoom:
          LeaveRoomCommand lrc = LeaveRoomCommand.class.cast(cmd);
          statusCode = StatusCode.from(logicRoomPoolFactory.getLogicRoomStub()
              .leave(ClientRoomOpsMessage.newBuilder().setRoomId(lrc.getTarget()).setUserId(userId).build()));
          break;
        case P2P:
          P2PMessageCommand pmc = P2PMessageCommand.class.cast(cmd);
          P2PMessage pmsg = new P2PMessage();
          BeanUtils.copyProperties(pmc, pmsg);
          pmsg.setFrom(userId);
          statusCode = StatusCode
              .from(logicMsgPoolFactory.getLogicMessageStub().dispatch(BaseDispatchMessage.convert(pmsg)));
          break;
        case P2Room:
          P2RoomMessageCommand prc = P2RoomMessageCommand.class.cast(cmd);
          Peer2RoomMessage rpm = new Peer2RoomMessage();
          BeanUtils.copyProperties(prc, rpm);
          rpm.setFrom(userId);
          statusCode = StatusCode
              .from(logicMsgPoolFactory.getLogicMessageStub().dispatch(BaseDispatchMessage.convert(rpm)));
          break;
        case Ack:
          AckCommand ack = AckCommand.class.cast(cmd);
          logicMsgPoolFactory.getLogicMessageStub()
          .ack(AckMessage.newBuilder().setUserId(userId).setMsgId(ack.getTarget()).build());
          break;
        case ListRooms:
          BaseResponse resp = logicRoomPoolFactory.getLogicRoomStub().listRooms(StringValue.of(userId));
          if (resp.hasPluralityValue()) {
            Collection<String> rooms = resp.getPluralityValue().getItemsList().stream().map(any -> {
              try {
                return any.unpack(StringValue.class);
              } catch (InvalidProtocolBufferException e) {
                throw new IllegalArgumentException(e);
              }
            }).map(StringValue::getValue).collect(Collectors.toSet());
            ListRoomsEchoCommand listRoomEcho = new ListRoomsEchoCommand();
            listRoomEcho.setTarget(cmd.getId());
            listRoomEcho.setRooms(rooms);
            echo = listRoomEcho;
          }
          break;
        case RetrieveUnAck:
          List<BaseDispatchMessage> pendingMessages = userService.getPendingMessage(userId);
          RetrieveUnAckEchoCommand retrieveUnAckEcho = new RetrieveUnAckEchoCommand();
          retrieveUnAckEcho.setTarget(cmd.getId());
          retrieveUnAckEcho.setMessages(pendingMessages);
          echo = retrieveUnAckEcho;
          break;
        default:
          throw new UnsupportedOperationException();
      }

      if (Objects.isNull(echo) && Objects.nonNull(statusCode) && !statusCode.isSuccess()) { // 如果有业务异常编码,则统一处理
        echo = ErrorEchoCommand.fromStatusCode(cmd.getId(), statusCode);
      }
    }

    return echo;

  }

  @Override
  public boolean support(BaseCommand cmd) {
    return true;
  }

}
